package com.stars.easyms.schedule.record;

import com.stars.easyms.schedule.bean.BatchResult;
import com.stars.easyms.schedule.factory.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 异步线程基础继承类（批量处理）
 *
 * @author guoguifang
 */
abstract class BaseAsynchronousTask<T> {

    /**
     * 日志记录器
     */
    private Logger logger;

    /**
     * 执行队列
     */
    private Queue<T> queue;

    /**
     * 错误数据列表
     */
    private Map<Integer, List<Map<String, Object>>> errorDataListMap;

    /**
     * 失败重试次数
     */
    private int failRetryCount;

    /**
     * 延迟提交时间(该时间越长对数据库压力越小，但实时性越低)
     */
    private long delayTime;

    /**
     * 每次批量处理任务的最大数量
     */
    private int batchExecuteMaxCountPerTime;

    /**
     * 工作线程池
     */
    private ThreadPoolExecutor executor;

    /**
     * 工作线程数量
     */
    private AtomicInteger workCount;

    /**
     * 最大工作线程数量
     */
    private int maxWorkCount;

    /**
     * 线程并发锁
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * 线程空闲等待锁监控
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * 为每一个批量处理的方法建立一个独立的事务
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = {Exception.class})
    public final BatchResult doExecute(List<T> list) throws Exception {
        return execute(list);
    }

    /**
     * 异步处理数据的方法(批量)
     *
     * @param list 批量待处理的数据
     * @return BatchResult 批量处理的结果
     * @throws Exception
     */
    protected abstract BatchResult execute(List<T> list) throws Exception;

    /**
     * 失败后重试次数(默认3)
     */
    protected int failRetryCount() {
        return 3;
    }

    /**
     * 延迟时间(默认100毫秒)
     */
    protected long delayTime() {
        return 100;
    }

    /**
     * 每次批量处理任务的最大数量(默认3000)
     */
    protected int batchExecuteMaxCountPerTime() {
        return 3000;
    }

    /**
     * 每次批量处理任务的最大数量(默认2)
     */
    protected int maxWorkCount() {
        return 2;
    }

    protected void offer(T t) {
        if (this.queue == null) {
            this.mainLock.lock();
            try {
                if (this.queue == null) {
                    this.logger = LoggerFactory.getLogger(this.getClass());
                    this.failRetryCount = this.failRetryCount() <= 0 ? 0 : this.failRetryCount();
                    this.delayTime = this.delayTime() <= 1 ? 1 : this.delayTime();
                    this.batchExecuteMaxCountPerTime = this.batchExecuteMaxCountPerTime() <= 500 ? 500 : this.batchExecuteMaxCountPerTime();
                    this.queue = new LinkedBlockingQueue<>();
                    this.errorDataListMap = new ConcurrentHashMap<>(this.failRetryCount);
                    for (int failCount = 0; failCount < this.failRetryCount; failCount++) {
                        final List<Map<String, Object>> list = Collections.synchronizedList(new ArrayList<>());
                        this.errorDataListMap.put(failCount, list);
                    }
                    this.workCount = new AtomicInteger(0);
                    this.maxWorkCount = this.maxWorkCount() <= 1 ? 1 : this.maxWorkCount();
                    this.executor = new ThreadPoolExecutor(1, this.maxWorkCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new DefaultThreadFactory().setNameFormat("AsynThread-" + this.getClass().getSimpleName() + "-%d").build());
                    this.executor.execute(new Work(true));
                    this.workCount.getAndIncrement();
                }
            } finally {
                this.mainLock.unlock();
            }
        }
        this.queue.offer(t);
        this.mainLock.lock();
        try {
            this.termination.signalAll();
        } finally {
            this.mainLock.unlock();
        }
    }

    private class Work implements Runnable {

        private final boolean keepAlive;

        private Work(boolean keepAlive) {
            this.keepAlive = keepAlive;
        }

        @SuppressWarnings("unchecked")
        @Override
        public void run() {

            while (this.keepAlive || queue.size() > batchExecuteMaxCountPerTime) {
                if (this.keepAlive && queue.isEmpty() && errorDataIsEmpty()) {
                    mainLock.lock();
                    try {
                        if (queue.isEmpty() && errorDataIsEmpty()) {
                            termination.awaitNanos(TimeUnit.SECONDS.toNanos(60));
                        }
                    } catch (InterruptedException e) {
                        logger.debug(e.getMessage());
                    } finally {
                        mainLock.unlock();
                    }
                    continue;
                }
                if (workCount.get() < maxWorkCount && queue.size() / batchExecuteMaxCountPerTime > workCount.get() - 1) {
                    executor.setCorePoolSize(workCount.incrementAndGet());
                    executor.execute(new Work(false));
                }
                if (!queue.isEmpty()) {
                    List<T> list = new ArrayList<>();
                    int delayCount = 0;
                    T t = queue.poll();
                    while (t != null) {
                        list.add(t);
                        if (list.size() >= batchExecuteMaxCountPerTime) {
                            try {
                                BatchResult batchResult = doExecute(list);
                                if (batchResult != null) {
                                    addErrorDatas(0, batchResult.getFailDatas());
                                }
                            } catch (Exception e) {
                                logger.error("执行异步任务失败!", e);
                                addErrorDatas(0, list);
                            }
                            list.clear();
                        }
                        t = queue.poll();
                        if (t == null) {
                            if (delayCount == 0) {
                                try {
                                    Thread.sleep(delayTime);
                                } catch (InterruptedException e) {
                                    logger.debug(e.getMessage());
                                }
                                delayCount++;
                                t = queue.poll();
                            } else {
                                break;
                            }
                        }
                    }
                    if (list.size() > 0) {
                        try {
                            BatchResult batchResult = doExecute(list);
                            if (batchResult != null) {
                                addErrorDatas(0, batchResult.getFailDatas());
                            }
                        } catch (Exception e) {
                            addErrorDatas(0, list);
                            logger.error("执行异步任务失败!", e);
                        }
                    }
                }
                if (failRetryCount > 0 && !errorDataIsEmpty()) {
                    Long waitTime = 60000L;
                    for (int failCount = failRetryCount - 1; failCount >= 0; failCount--) {
                        List<Map<String, Object>> currentErrorDataList = errorDataListMap.get(failCount);
                        if (currentErrorDataList.isEmpty()) {
                            continue;
                        }

                        Iterator<Map<String, Object>> iterator = currentErrorDataList.iterator();
                        List<T> list = new ArrayList<>();
                        while (iterator.hasNext()) {
                            Map<String, Object> errorData = iterator.next();
                            T t = (T) errorData.get("object");
                            Long time = (Long) errorData.get("time");
                            // 每一分钟重试一次，最多重试3次
                            Long overTime = System.currentTimeMillis() - time;
                            if (overTime < 60000) {
                                if (overTime < waitTime) {
                                    waitTime = overTime;
                                }
                                continue;
                            }
                            iterator.remove();

                            list.add(t);
                            if (list.size() >= batchExecuteMaxCountPerTime) {
                                try {
                                    BatchResult batchResult = doExecute(list);
                                    if (batchResult != null && failCount < failRetryCount - 1) {
                                        addErrorDatas(failCount + 1, batchResult.getFailDatas());
                                    }
                                } catch (Exception e) {
                                    logger.error("执行异步任务失败!", e);
                                    if (failCount < failRetryCount - 1) {
                                        addErrorDatas(failCount + 1, list);
                                    }
                                }
                                list.clear();
                            }
                        }
                        if (list.size() > 0) {
                            try {
                                BatchResult batchResult = doExecute(list);
                                if (batchResult != null && failCount < failRetryCount - 1) {
                                    addErrorDatas(failCount + 1, batchResult.getFailDatas());
                                }
                            } catch (Exception e) {
                                if (failCount < failRetryCount - 1) {
                                    addErrorDatas(failCount + 1, list);
                                }
                                logger.error("执行异步任务失败!", e);
                            }
                        }
                    }
                    if (this.keepAlive && queue.isEmpty()) {
                        mainLock.lock();
                        try {
                            if (queue.isEmpty()) {
                                termination.awaitNanos(TimeUnit.MILLISECONDS.toNanos(waitTime));
                            }
                        } catch (InterruptedException e) {
                            logger.debug(e.getMessage());
                        } finally {
                            mainLock.unlock();
                        }
                    }
                }
            }
            executor.setCorePoolSize(workCount.decrementAndGet());
        }

        @SuppressWarnings("unchecked")
        private boolean errorDataIsEmpty() {
            for (Map.Entry entry : errorDataListMap.entrySet()) {
                List<Map<String, Object>> list = (List<Map<String, Object>>) entry.getValue();
                if (!list.isEmpty()) {
                    return false;
                }
            }
            return true;
        }

        private void addErrorData(int failCount, T t) {
            if (failRetryCount > 0) {
                List<Map<String, Object>> list = errorDataListMap.get(failCount);
                Map<String, Object> errorData = new HashMap<>(2);
                errorData.put("object", t);
                errorData.put("time", System.currentTimeMillis());
                list.add(errorData);
            }
        }

        private void addErrorDatas(int failCount, List<T> list) {
            if (failRetryCount > 0 && list != null && list.size() > 0) {
                for (T t : list) {
                    addErrorData(failCount, t);
                }
            }
        }
    }
}